/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package whisk.core.entity

import java.time.Instant

import scala.concurrent.Future
import scala.language.postfixOps
import scala.util.Try
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import spray.json.JsNumber
import spray.json.JsObject
import spray.json.JsString
import spray.json.RootJsonFormat
import whisk.common.Logging
import whisk.common.TransactionId
import whisk.core.ConfigKeys
import whisk.core.database.ArtifactStore
import whisk.core.database.ArtifactStoreProvider
import whisk.core.database.DocumentRevisionProvider
import whisk.core.database.DocumentSerializer
import whisk.core.database.StaleParameter
import whisk.spi.SpiLoader
import pureconfig._
import scala.reflect.classTag

package object types {
  type AuthStore = ArtifactStore[WhiskAuth]
  type EntityStore = ArtifactStore[WhiskEntity]
}

case class DBConfig(actionsDdoc: String, activationsDdoc: String, activationsFilterDdoc: String)

protected[core] trait WhiskDocument extends DocumentSerializer with DocumentRevisionProvider {

  /**
   * Gets unique document identifier for the document.
   */
  protected def docid: DocId

  /**
   * Creates DocId from the unique document identifier and the
   * document revision if one exists.
   */
  protected[core] final def docinfo: DocInfo = DocInfo(docid, rev)

  /**
   * The representation as JSON, e.g. for REST calls. Does not include id/rev.
   */
  def toJson: JsObject

  /**
   * Database JSON representation. Includes id/rev when appropriate. May
   * differ from `toJson` in exceptional cases.
   */
  override def toDocumentRecord: JsObject = {
    val id = docid.id
    val revOrNull = rev.rev

    // Building up the fields.
    val base = this.toJson.fields
    val withId = base + ("_id" -> JsString(id))
    val withRev = if (revOrNull == null) withId else { withId + ("_rev" -> JsString(revOrNull)) }
    JsObject(withRev)
  }
}

object WhiskAuthStore {
  implicit val docReader = WhiskDocumentReader

  def datastore()(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) =
    SpiLoader.get[ArtifactStoreProvider].makeStore[WhiskAuth]()
}

object WhiskEntityStore {

  def datastore()(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) =
    SpiLoader
      .get[ArtifactStoreProvider]
      .makeStore[WhiskEntity]()(
        classTag[WhiskEntity],
        WhiskEntityJsonFormat,
        WhiskDocumentReader,
        system,
        logging,
        materializer)
}

object WhiskActivationStore {
  implicit val docReader = WhiskDocumentReader

  def datastore()(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) =
    SpiLoader.get[ArtifactStoreProvider].makeStore[WhiskActivation](useBatching = true)
}

/**
 * A class to type the design doc and view within a database.
 *
 * @param ddoc the design document
 * @param view the view name within the design doc
 */
protected[core] case class View(ddoc: String, view: String) {

  /** The name of the table to query. */
  val name = s"$ddoc/$view"
}

/**
 * This object provides some utilities that query the whisk datastore.
 * The datastore is assumed to have views (pre-computed joins or indexes)
 * for each of the whisk collection types. Entities may be queries by
 * [path, date] where
 *
 * - path is the either root namespace for an entity (the owning subject
 *   or organization) or a packaged qualified namespace,
 * - date is the date the entity was created or last updated, or for activations
 *   this is the start of the activation. See EntityRecord for the last updated
 *   property.
 *
 * This order is important because the datastore is assumed to sort lexicographically
 * and hence either the fields are ordered according to the set of queries that are
 * desired: all entities in a namespace (by type), further refined by date, further
 * refined by name.
 *
 */
object WhiskEntityQueries {
  val TOP = "\ufff0"

  /** The design document to use for queries. */
  val designDoc = loadConfigOrThrow[DBConfig](ConfigKeys.db).actionsDdoc

  /** The view name for the collection, within the design document. */
  def view(ddoc: String = designDoc, collection: String) = new View(ddoc, collection)
}

trait WhiskEntityQueries[T] {
  val collectionName: String
  val serdes: RootJsonFormat[T]
  import WhiskEntityQueries._

  /** The view name for the collection, within the design document. */
  lazy val view: View = WhiskEntityQueries.view(collection = collectionName)

  /**
   * Queries the datastore for records from a specific collection (i.e., type) matching
   * the given path (which should be one namespace, or namespace + package name).
   *
   * @return list of records as JSON object if docs parameter is false, as Left
   *         and a list of the records as their type T if including the full record, as Right
   */
  def listCollectionInNamespace[A <: WhiskEntity](
    db: ArtifactStore[A],
    path: EntityPath, // could be a namesapce or namespace + package name
    skip: Int,
    limit: Int,
    includeDocs: Boolean = false,
    since: Option[Instant] = None,
    upto: Option[Instant] = None,
    stale: StaleParameter = StaleParameter.No,
    viewName: View = view)(implicit transid: TransactionId): Future[Either[List[JsObject], List[T]]] = {
    val convert = if (includeDocs) Some((o: JsObject) => Try { serdes.read(o) }) else None
    val startKey = List(path.asString, since map { _.toEpochMilli } getOrElse 0)
    val endKey = List(path.asString, upto map { _.toEpochMilli } getOrElse TOP, TOP)
    query(db, viewName, startKey, endKey, skip, limit, reduce = false, stale, convert)
  }

  /**
   * Queries the datastore for the records count in a specific collection (i.e., type) matching
   * the given path (which should be one namespace, or namespace + package name).
   *
   * @return JSON object with a single key, the collection name, and a value equal to the view length
   */
  def countCollectionInNamespace[A <: WhiskEntity](
    db: ArtifactStore[A],
    path: EntityPath, // could be a namespace or namespace + package name
    skip: Int,
    since: Option[Instant] = None,
    upto: Option[Instant] = None,
    stale: StaleParameter = StaleParameter.No,
    viewName: View = view)(implicit transid: TransactionId): Future[JsObject] = {
    implicit val ec = db.executionContext
    val startKey = List(path.asString, since map { _.toEpochMilli } getOrElse 0)
    val endKey = List(path.asString, upto map { _.toEpochMilli } getOrElse TOP, TOP)
    db.count(viewName.name, startKey, endKey, skip, stale) map { count =>
      JsObject(collectionName -> JsNumber(count))
    }
  }

  protected[entity] def query[A <: WhiskEntity](
    db: ArtifactStore[A],
    view: View,
    startKey: List[Any],
    endKey: List[Any],
    skip: Int,
    limit: Int,
    reduce: Boolean,
    stale: StaleParameter = StaleParameter.No,
    convert: Option[JsObject => Try[T]])(implicit transid: TransactionId): Future[Either[List[JsObject], List[T]]] = {
    implicit val ec = db.executionContext
    val includeDocs = convert.isDefined
    db.query(view.name, startKey, endKey, skip, limit, includeDocs, descending = true, reduce, stale) map { rows =>
      convert map { fn =>
        Right(rows flatMap { row =>
          fn(row.fields("doc").asJsObject) toOption
        })
      } getOrElse {
        Left(rows flatMap { normalizeRow(_, reduce) toOption })
      }
    }
  }

  /**
   * Normalizes the raw JsObject response from the datastore since the
   * response differs in the case of a reduction.
   */
  protected def normalizeRow(row: JsObject, reduce: Boolean) = Try {
    if (!reduce) {
      row.fields("value").asJsObject
    } else row
  }
}
